Flink SQL ব্যবহার করে একটি Data Analytics প্রোজেক্ট তৈরি করা অত্যন্ত কার্যকর, কারণ এটি স্ট্রিম ডেটা প্রসেসিং এবং ব্যাচ ডেটা এনালাইটিক্স উভয়ের জন্য SQL সিনট্যাক্স ব্যবহার করে ডেটা প্রসেসিংকে সহজ করে। Flink SQL ব্যবহার করে ডেটা সোর্স থেকে ডেটা পড়া, ট্রান্সফর্মেশন করা, এবং বিভিন্ন ধরনের এনালাইটিক্স করা সম্ভব। এখানে, আমরা একটি উদাহরণ প্রোজেক্ট তৈরি করব যা একটি রিয়েল-টাইম ডেটা স্ট্রিম প্রসেস করবে এবং SQL ব্যবহার করে কিছু এনালাইটিক্স সম্পাদন করবে।
আমাদের উদাহরণ প্রোজেক্টটি একটি ই-কমার্স সাইটের রিয়েল-টাইম অর্ডার ডেটা প্রসেস করবে। আমরা নিম্নোক্ত কাজগুলো সম্পাদন করব:
প্রথমে, একটি Flink SQL Environment তৈরি করতে হবে। আমরা এখানে একটি Java API উদাহরণ ব্যবহার করছি, তবে Flink SQL CLI থেকেও একই কাজ করা সম্ভব।
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkSQLAnalytics {
public static void main(String[] args) {
// Execution এবং Table Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Flink SQL কোয়েরি এবং ডেটা প্রসেসিং এখানে হবে
}
}
Flink SQL এ, আমরা একটি Kafka সোর্স ব্যবহার করে অর্ডার ডেটা পড়ব। Kafka এর মাধ্যমে প্রতিটি অর্ডার একটি JSON ফরম্যাটে স্ট্রিম করা হবে।
String kafkaSourceDDL = "CREATE TABLE orders (" +
" order_id STRING," +
" product_id STRING," +
" quantity INT," +
" price DOUBLE," +
" order_time TIMESTAMP(3)," +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'ecommerce_orders'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")";
tableEnv.executeSql(kafkaSourceDDL);
বর্ণনা:
orders
টেবিল রেজিস্টার করা হয়েছে যা Kafka থেকে ডেটা পড়ে।WATERMARK
ব্যবহার করে order_time
ফিল্ডে টাইম উইন্ডো ম্যানেজ করা হয়েছে।Flink SQL ব্যবহার করে বিভিন্ন এনালাইটিক্স কোয়েরি চালানো হবে।
SELECT product_id, SUM(quantity * price) AS total_sales
FROM orders
GROUP BY product_id;
বর্ণনা: এই কোয়েরি প্রতিটি প্রোডাক্টের জন্য মোট বিক্রয় হিসাব করে।
SELECT
product_id,
TUMBLE_START(order_time, INTERVAL '10' SECOND) AS window_start,
TUMBLE_END(order_time, INTERVAL '10' SECOND) AS window_end,
SUM(quantity * price) AS total_sales
FROM orders
GROUP BY
product_id,
TUMBLE(order_time, INTERVAL '10' SECOND);
বর্ণনা: এই কোয়েরি প্রতিটি ১০ সেকেন্ডের উইন্ডোতে প্রতিটি প্রোডাক্টের বিক্রয় সংক্ষেপ হিসাব করে।
SELECT product_id, COUNT(order_id) AS order_count
FROM orders
GROUP BY product_id
ORDER BY order_count DESC
LIMIT 1;
বর্ণনা: এই কোয়েরি সর্বাধিক অর্ডার সংখ্যা বিশিষ্ট প্রোডাক্ট বের করে এবং তা সারণী অনুযায়ী সাজায়।
প্রসেস করা ডেটাকে Flink SQL ব্যবহার করে Kafka বা অন্য কোনও স্টোরেজ সিস্টেমে পাঠানো যায়। এখানে, আমরা Kafka সিংক ব্যবহার করছি।
String kafkaSinkDDL = "CREATE TABLE result_sink (" +
" product_id STRING," +
" total_sales DOUBLE" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'processed_sales'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")";
tableEnv.executeSql(kafkaSinkDDL);
// প্রক্রিয়াকৃত টেবিলকে সিংকে লেখার জন্য SQL
tableEnv.executeSql("INSERT INTO result_sink SELECT product_id, SUM(quantity * price) AS total_sales FROM orders GROUP BY product_id");
বর্ণনা:
result_sink
নামে একটি Kafka সিংক টেবিল তৈরি করা হয়েছে, যেখানে প্রক্রিয়াকৃত ডেটা পাঠানো হচ্ছে।INSERT INTO
কমান্ড ব্যবহার করে SQL কোয়েরি সিংকে রেজাল্ট পাঠাচ্ছে।Flink এর SQL কোয়েরিগুলো চালানোর পর আপনি Flink এর ড্যাশবোর্ড থেকে টাস্ক এবং ডেটা প্রসেসিং মনিটর করতে পারেন। এছাড়া, আপনি Kafka Consumer ব্যবহার করে প্রক্রিয়াকৃত ডেটা স্ট্রিম দেখতেও পারেন।
# Kafka Consumer দিয়ে আউটপুট মনিটর করা
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic processed_sales --from-beginning
Apache Flink SQL ব্যবহার করে এই প্রোজেক্টে আমরা দেখলাম কীভাবে একটি রিয়েল-টাইম ডেটা স্ট্রিম থেকে এনালাইটিক্স করা যায়। Flink SQL এর মাধ্যমে সহজেই ডেটা সোর্স রেজিস্টার করে এবং বিভিন্ন ট্রান্সফরমেশন ও অ্যানালাইটিক্স করা সম্ভব। Flink SQL এর ক্ষমতা বড় আকারের ডেটা প্রসেসিং এবং অ্যানালাইটিক্স প্রোজেক্টে অত্যন্ত কার্যকর।